package com.ld.shieldsb.canalclient.recoder;

import com.ld.shieldsb.canalclient.model.Dml;
import com.ld.shieldsb.canalclient.model.RecordHandlerDmlModel;
import com.ld.shieldsb.canalclient.model.RecordModel;
import com.ld.shieldsb.common.core.queue.QueueEntry;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Data
@Slf4j
public abstract class RecorderHandler {
    protected Recorder recorder;
    protected volatile boolean running = false;
    protected Thread dmlThread = null; // 处理线程
    protected Thread validDmlThread = null; // 有效Dml处理线程
    protected Thread dataThread = null; // 同步数据处理线程
    // 线程异常处理器
    protected Thread.UncaughtExceptionHandler handler = (t, e) -> log.error("接收canal记录出错！", e);

    protected boolean useDmlThread = false;
    protected boolean useValidDmlThread = false;
    protected boolean useDataThread = false;

    public RecorderHandler() {
        this(0, 1000, 1000);
    }

    private void setRecorder(Recorder recorder) {
        this.recorder = recorder;

        useDmlThread = recorder.isUseDmlQueue();
        useValidDmlThread = recorder.isUseValidDmlQueue();
        useDataThread = recorder.isUserSyncQueue();
    }

    public RecorderHandler(int dmlQueueSize, int validDmlSize, int syncQueueSize) {
        setRecorder(new Recorder(dmlQueueSize, validDmlSize, syncQueueSize));
    }

    protected void processDml() {
        if (recorder != null && recorder.isUseDmlQueue()) {

            while (running) {
                try {
                    for (int i = 0, len = recorder.getDmlQueue().size(); i < len; i++) {
                        takeDmlRecord(recorder.takeDmlRecord());
                    }
                    Thread.sleep(1);
                } catch (Throwable e) {
                    log.error("", e);
                }
            }
        }
    }

    /**
     * 处理处理器接收到的有效Dml
     * 
     * @Title processValidDml
     * @author 吕凯
     * @date 2021年12月16日 下午4:22:29 void
     */
    protected void processValidDml() {
        if (recorder != null && recorder.isUseValidDmlQueue()) {
            while (running) {

                try {
                    for (int i = 0, len = recorder.getValidDmlQueue().size(); i < len; i++) {
                        takeValidDmlRecord(recorder.takeValidDmlRecord());
                    }
                    Thread.sleep(1);
                } catch (Throwable e) {
                    log.error("", e);
                }
            }
        }
    }

    protected void processSyncData() {
        if (recorder != null && recorder.isUserSyncQueue()) {

            while (running) {
                try {
                    for (int i = 0, len = recorder.getSyncQueue().size(); i < len; i++) {
                        takeSyncRecord(recorder.takeSyncRecord());
                    }
                    Thread.sleep(1);
                } catch (Throwable e) {
                    log.error("", e);
                }
            }
        }
    }

    /**
     * 接收dml数据
     * 
     * @Title takeDmlCountRecord
     * @author 吕凯
     * @date 2021年12月16日 下午2:08:21
     * @param dml
     *            void
     */
    protected abstract void takeDmlRecord(QueueEntry<Dml> dml);

    protected abstract void takeValidDmlRecord(QueueEntry<RecordHandlerDmlModel> dml);

    /**
     * 接收同步数据
     * 
     * @Title takeSyncRecord
     * @author 吕凯
     * @date 2021年12月16日 下午2:08:32
     * @param recordModel
     *            void
     */
    protected abstract void takeSyncRecord(QueueEntry<RecordModel> recordModel);

    public void start() {
        // 不重复启动
        if (running) {
            return;
        }
        running = true;

        if (useDmlThread) {
            dmlThread = new Thread(this::processDml);
            dmlThread.setDaemon(true);
            dmlThread.setUncaughtExceptionHandler(handler);

            dmlThread.start();
        }
        // 处理器接收到的有效Dml
        if (useValidDmlThread) {
            validDmlThread = new Thread(this::processValidDml);
            validDmlThread.setDaemon(true);
            validDmlThread.setUncaughtExceptionHandler(handler);

            validDmlThread.start();
        }
        // 同步数据线程
        if (useDataThread) {

            dataThread = new Thread(this::processSyncData);
            dataThread.setDaemon(true);
            dataThread.setUncaughtExceptionHandler(handler);
            dataThread.start();
        }

    }

    public void stop() {
        if (!running) {
            return;
        }
        running = false;
        if (dmlThread != null) {
            try {
                dmlThread.join();
            } catch (InterruptedException e) {
                log.error("", e);
                Thread.currentThread().interrupt();
            }
        }
        if (validDmlThread != null) {
            try {
                validDmlThread.join();
            } catch (InterruptedException e) {
                log.error("", e);
                Thread.currentThread().interrupt();
            }
        }
        if (dataThread != null) {
            try {
                dataThread.join();
            } catch (InterruptedException e) {
                log.error("", e);
                Thread.currentThread().interrupt();
            }
        }

    }

}
